agentmux_srv\backend\storage\filestore/
core.rs

1// Copyright 2025-2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3
4//! FileStore struct and CRUD operations.
5
6
7use std::collections::HashMap;
8use std::path::Path;
9use std::sync::{Arc, Mutex};
10use std::time::{SystemTime, UNIX_EPOCH};
11
12use rusqlite::{params, Connection};
13
14use super::cache::CacheEntry;
15use super::types::{FileMeta, FileOpts, WaveFile};
16use crate::backend::storage::error::StoreError;
17use crate::backend::storage::migrations::{
18    check_schema_compat, run_filestore_migrations, stamp_version, FILESTORE_SCHEMA_VERSION,
19};
20
21/// Default part size: 64KB (matches Go's DefaultPartDataSize).
22pub(super) const PART_DATA_SIZE: usize = 64 * 1024;
23
24/// Default flush interval in seconds.
25#[allow(dead_code)]
26pub const DEFAULT_FLUSH_SECS: u64 = 5;
27
28/// Clean cache entries idle longer than this are evicted during flush.
29#[allow(dead_code)]
30pub const CACHE_TTL_SECS: u64 = 60;
31
32/// Hard cap on the total byte size held in the metadata cache (128 MB).
33/// When this is exceeded, LRU eviction removes the oldest entries first.
34pub const MAX_CACHE_BYTES: usize = 128 * 1024 * 1024;
35
36/// SQLite-backed file storage with write-through cache.
37pub struct FileStore {
38    pub(super) conn: Mutex<Connection>,
39    pub(super) cache: Mutex<HashMap<(String, String), CacheEntry>>,
40    /// Total bytes currently accounted for across all cache entries.
41    pub(super) cache_total_bytes: Mutex<usize>,
42    /// Maximum bytes the cache may hold before LRU eviction kicks in.
43    pub(super) cache_max_bytes: usize,
44}
45
46impl FileStore {
47    /// Open a FileStore backed by a file on disk.
48    pub fn open(path: &Path) -> Result<Self, StoreError> {
49        let conn = Connection::open(path)?;
50        Self::configure_and_migrate(conn)
51    }
52
53    /// Open an in-memory FileStore for testing.
54    #[allow(dead_code)]
55    pub fn open_in_memory() -> Result<Self, StoreError> {
56        let conn = Connection::open_in_memory()?;
57        Self::configure_and_migrate(conn)
58    }
59
60    /// Open an in-memory FileStore with a custom LRU byte cap.  Used in tests.
61    #[allow(dead_code)]
62    pub fn open_in_memory_with_cap(max_bytes: usize) -> Result<Self, StoreError> {
63        let conn = Connection::open_in_memory()?;
64        let mut store = Self::configure_and_migrate(conn)?;
65        store.cache_max_bytes = max_bytes;
66        Ok(store)
67    }
68
69    fn configure_and_migrate(conn: Connection) -> Result<Self, StoreError> {
70        conn.execute_batch(
71            "PRAGMA journal_mode=WAL;
72             PRAGMA busy_timeout=5000;",
73        )?;
74        // Safety lock BEFORE migrations — same discipline as wstore /
75        // sagas: refuse to touch a newer-schema DB on disk before any
76        // mutating step runs. See `check_schema_compat` doc.
77        check_schema_compat(&conn, FILESTORE_SCHEMA_VERSION, "filestore.db")?;
78        run_filestore_migrations(&conn)?;
79        stamp_version(&conn, FILESTORE_SCHEMA_VERSION)?;
80        Ok(Self {
81            conn: Mutex::new(conn),
82            cache: Mutex::new(HashMap::new()),
83            cache_total_bytes: Mutex::new(0),
84            cache_max_bytes: MAX_CACHE_BYTES,
85        })
86    }
87
88    pub(super) fn now_ms() -> i64 {
89        SystemTime::now()
90            .duration_since(UNIX_EPOCH)
91            .unwrap_or_default()
92            .as_millis() as i64
93    }
94
95    /// Evict the least-recently-used cache entries until `cache_total_bytes <= cache_max_bytes`.
96    /// Must be called with *neither* `cache` nor `cache_total_bytes` lock held.
97    pub(super) fn evict_to_cap(&self) {
98        // Fast path: check total without evicting.
99        let total = *self.cache_total_bytes.lock().unwrap();
100        if total <= self.cache_max_bytes {
101            return;
102        }
103
104        // Collect (last_access_ms, key, size) for all entries, sort oldest-first.
105        let candidates: Vec<(i64, (String, String), usize)> = {
106            let cache = self.cache.lock().unwrap();
107            cache
108                .iter()
109                .map(|(k, e)| (e.last_access_ms, k.clone(), e.cached_size_bytes))
110                .collect()
111        };
112
113        // Sort by last_access_ms ascending (oldest first).
114        let mut candidates = candidates;
115        candidates.sort_by_key(|(ts, _, _)| *ts);
116
117        let mut evicted_count = 0usize;
118        let mut evicted_bytes = 0usize;
119
120        for (_, key, size) in candidates {
121            {
122                let total = *self.cache_total_bytes.lock().unwrap();
123                if total <= self.cache_max_bytes {
124                    break;
125                }
126            }
127            {
128                let mut cache = self.cache.lock().unwrap();
129                if cache.remove(&key).is_some() {
130                    let mut total = self.cache_total_bytes.lock().unwrap();
131                    *total = total.saturating_sub(size);
132                    evicted_count += 1;
133                    evicted_bytes += size;
134                }
135            }
136        }
137
138        if evicted_count > 0 {
139            tracing::debug!(
140                "filestore lru: evicted {} entries, freed {} bytes (cap={})",
141                evicted_count,
142                evicted_bytes,
143                self.cache_max_bytes,
144            );
145        }
146    }
147
148    /// Create a new file. Fails if file already exists.
149    #[allow(dead_code)]
150    pub fn make_file(
151        &self,
152        zone_id: &str,
153        name: &str,
154        meta: FileMeta,
155        opts: FileOpts,
156    ) -> Result<(), StoreError> {
157        let now = Self::now_ms();
158        let file = WaveFile {
159            zoneid: zone_id.to_string(),
160            name: name.to_string(),
161            size: 0,
162            createdts: now,
163            modts: now,
164            opts,
165            meta,
166        };
167
168        let conn = self.conn.lock().unwrap();
169        let exists: bool = conn
170            .query_row(
171                "SELECT 1 FROM db_wave_file WHERE zoneid = ?1 AND name = ?2",
172                params![zone_id, name],
173                |_| Ok(true),
174            )
175            .unwrap_or(false);
176
177        if exists {
178            return Err(StoreError::AlreadyExists);
179        }
180
181        let opts_json = serde_json::to_string(&file.opts)?;
182        let meta_json = serde_json::to_string(&file.meta)?;
183        conn.execute(
184            "INSERT INTO db_wave_file (zoneid, name, size, createdts, modts, opts, meta) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
185            params![file.zoneid, file.name, file.size, file.createdts, file.modts, opts_json, meta_json],
186        )?;
187
188        // Add to cache
189        let key = (zone_id.to_string(), name.to_string());
190        let entry = CacheEntry {
191            file: Some(file),
192            data_entries: HashMap::new(),
193            dirty: false,
194            last_access_ms: now,
195            cached_size_bytes: 64, // new file is size=0; charge minimum overhead
196        };
197        {
198            let mut cache = self.cache.lock().unwrap();
199            cache.insert(key, entry);
200            *self.cache_total_bytes.lock().unwrap() += 64;
201        }
202        self.evict_to_cap();
203
204        Ok(())
205    }
206
207    /// Delete a file and all its data parts.
208    #[allow(dead_code)]
209    pub fn delete_file(&self, zone_id: &str, name: &str) -> Result<(), StoreError> {
210        let conn = self.conn.lock().unwrap();
211        conn.execute(
212            "DELETE FROM db_wave_file WHERE zoneid = ?1 AND name = ?2",
213            params![zone_id, name],
214        )?;
215        conn.execute(
216            "DELETE FROM db_file_data WHERE zoneid = ?1 AND name = ?2",
217            params![zone_id, name],
218        )?;
219        drop(conn);
220
221        // Remove from cache
222        let key = (zone_id.to_string(), name.to_string());
223        let mut cache = self.cache.lock().unwrap();
224        if let Some(removed) = cache.remove(&key) {
225            let mut total = self.cache_total_bytes.lock().unwrap();
226            *total = total.saturating_sub(removed.cached_size_bytes);
227        }
228
229        Ok(())
230    }
231
232    /// Delete all files in a zone.
233    #[allow(dead_code)]
234    pub fn delete_zone(&self, zone_id: &str) -> Result<(), StoreError> {
235        // Get file names first for cache cleanup
236        let names: Vec<String> = {
237            let conn = self.conn.lock().unwrap();
238            let mut stmt = conn.prepare("SELECT name FROM db_wave_file WHERE zoneid = ?1")?;
239            let rows = stmt.query_map(params![zone_id], |row| row.get(0))?;
240            rows.filter_map(|r| r.ok()).collect()
241        };
242
243        let conn = self.conn.lock().unwrap();
244        conn.execute(
245            "DELETE FROM db_wave_file WHERE zoneid = ?1",
246            params![zone_id],
247        )?;
248        conn.execute(
249            "DELETE FROM db_file_data WHERE zoneid = ?1",
250            params![zone_id],
251        )?;
252        drop(conn);
253
254        let mut cache = self.cache.lock().unwrap();
255        let mut freed = 0usize;
256        for name in names {
257            if let Some(removed) = cache.remove(&(zone_id.to_string(), name)) {
258                freed += removed.cached_size_bytes;
259            }
260        }
261        if freed > 0 {
262            let mut total = self.cache_total_bytes.lock().unwrap();
263            *total = total.saturating_sub(freed);
264        }
265
266        Ok(())
267    }
268
269    /// Get file metadata. Returns None if file doesn't exist.
270    pub fn stat(&self, zone_id: &str, name: &str) -> Result<Option<WaveFile>, StoreError> {
271        // Check cache first
272        let key = (zone_id.to_string(), name.to_string());
273        {
274            let mut cache = self.cache.lock().unwrap();
275            if let Some(entry) = cache.get_mut(&key) {
276                entry.last_access_ms = Self::now_ms();
277                return Ok(entry.file.clone());
278            }
279        }
280
281        // Load from DB
282        let conn = self.conn.lock().unwrap();
283        let result = conn.query_row(
284            "SELECT zoneid, name, size, createdts, modts, opts, meta FROM db_wave_file WHERE zoneid = ?1 AND name = ?2",
285            params![zone_id, name],
286            |row| {
287                let opts_str: String = row.get(5)?;
288                let meta_str: String = row.get(6)?;
289                Ok(WaveFile {
290                    zoneid: row.get(0)?,
291                    name: row.get(1)?,
292                    size: row.get(2)?,
293                    createdts: row.get(3)?,
294                    modts: row.get(4)?,
295                    opts: serde_json::from_str(&opts_str).unwrap_or_default(),
296                    meta: serde_json::from_str(&meta_str).unwrap_or_default(),
297                })
298            },
299        );
300
301        match result {
302            Ok(file) => Ok(Some(file)),
303            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
304            Err(e) => Err(StoreError::Sqlite(e)),
305        }
306    }
307
308    /// Write (replace) entire file contents.
309    pub fn write_file(
310        &self,
311        zone_id: &str,
312        name: &str,
313        data: &[u8],
314    ) -> Result<(), StoreError> {
315        let key = (zone_id.to_string(), name.to_string());
316        let now = Self::now_ms();
317
318        // Split data into parts
319        let parts = Self::split_into_parts(data);
320
321        // Write directly to DB (write-through for full writes, matching Go's WriteFile)
322        let conn = self.conn.lock().unwrap();
323
324        // Verify file exists
325        let exists: bool = conn
326            .query_row(
327                "SELECT 1 FROM db_wave_file WHERE zoneid = ?1 AND name = ?2",
328                params![zone_id, name],
329                |_| Ok(true),
330            )
331            .unwrap_or(false);
332        if !exists {
333            return Err(StoreError::NotFound);
334        }
335
336        // Update file size
337        conn.execute(
338            "UPDATE db_wave_file SET size = ?1, modts = ?2 WHERE zoneid = ?3 AND name = ?4",
339            params![data.len() as i64, now, zone_id, name],
340        )?;
341
342        // Replace all data parts
343        conn.execute(
344            "DELETE FROM db_file_data WHERE zoneid = ?1 AND name = ?2",
345            params![zone_id, name],
346        )?;
347        for (idx, part_data) in parts.iter().enumerate() {
348            conn.execute(
349                "INSERT INTO db_file_data (zoneid, name, partidx, data) VALUES (?1, ?2, ?3, ?4)",
350                params![zone_id, name, idx as i32, part_data],
351            )?;
352        }
353        drop(conn);
354
355        // Update cache (metadata only — data parts are already in DB, read_file loads from DB)
356        {
357            let new_size = data.len().max(64);
358            let mut cache = self.cache.lock().unwrap();
359            if let Some(entry) = cache.get_mut(&key) {
360                let old_size = entry.cached_size_bytes;
361                if let Some(ref mut file) = entry.file {
362                    file.size = data.len() as i64;
363                    file.modts = now;
364                }
365                entry.last_access_ms = now;
366                entry.cached_size_bytes = new_size;
367                let delta = new_size as i64 - old_size as i64;
368                let mut total = self.cache_total_bytes.lock().unwrap();
369                if delta >= 0 {
370                    *total += delta as usize;
371                } else {
372                    *total = total.saturating_sub((-delta) as usize);
373                }
374            }
375        }
376        self.evict_to_cap();
377
378        Ok(())
379    }
380
381    /// Read entire file contents.
382    pub fn read_file(&self, zone_id: &str, name: &str) -> Result<Option<Vec<u8>>, StoreError> {
383        // Get file metadata
384        let file = match self.stat(zone_id, name)? {
385            Some(f) => f,
386            None => return Ok(None),
387        };
388
389        if file.size == 0 {
390            return Ok(Some(Vec::new()));
391        }
392
393        let data_len = file.data_length();
394        let start_idx = file.data_start_idx();
395        let num_parts = ((start_idx + data_len - 1) / PART_DATA_SIZE as i64 + 1) as i32;
396        let start_part = (start_idx / PART_DATA_SIZE as i64) as i32;
397
398        // Load parts from DB
399        let conn = self.conn.lock().unwrap();
400        let mut stmt = conn.prepare(
401            "SELECT partidx, data FROM db_file_data WHERE zoneid = ?1 AND name = ?2 ORDER BY partidx",
402        )?;
403        let rows = stmt.query_map(params![zone_id, name], |row| {
404            Ok((row.get::<_, i32>(0)?, row.get::<_, Vec<u8>>(1)?))
405        })?;
406
407        let mut parts_map: HashMap<i32, Vec<u8>> = HashMap::new();
408        for row in rows {
409            let (idx, data) = row?;
410            parts_map.insert(idx, data);
411        }
412        drop(stmt);
413        drop(conn);
414
415        // Assemble data
416        let mut result = Vec::with_capacity(data_len as usize);
417        for part_idx in start_part..start_part + num_parts {
418            if let Some(part_data) = parts_map.get(&part_idx) {
419                let part_start = part_idx as i64 * PART_DATA_SIZE as i64;
420                let skip = if part_start < start_idx {
421                    (start_idx - part_start) as usize
422                } else {
423                    0
424                };
425                let remaining = data_len as usize - result.len();
426                let take = remaining.min(part_data.len() - skip);
427                result.extend_from_slice(&part_data[skip..skip + take]);
428            }
429        }
430
431        let _ = (num_parts, start_part); // used in loop above
432        Ok(Some(result))
433    }
434
435    /// Append data to the end of a file.
436    pub fn append_data(
437        &self,
438        zone_id: &str,
439        name: &str,
440        data: &[u8],
441    ) -> Result<(), StoreError> {
442        if data.is_empty() {
443            return Ok(());
444        }
445
446        let key = (zone_id.to_string(), name.to_string());
447        let now = Self::now_ms();
448
449        let file = self.stat(zone_id, name)?.ok_or(StoreError::NotFound)?;
450        let new_size = file.size + data.len() as i64;
451
452        // Figure out which part to start writing at
453        let start_offset = file.size;
454        let start_part = (start_offset / PART_DATA_SIZE as i64) as i32;
455        let offset_in_part = (start_offset % PART_DATA_SIZE as i64) as usize;
456
457        // Load the last part if we need to append to it
458        let conn = self.conn.lock().unwrap();
459        let mut data_offset = 0usize;
460        let mut current_part = start_part;
461
462        if offset_in_part > 0 {
463            // Load existing partial part
464            let existing: Option<Vec<u8>> = conn
465                .query_row(
466                    "SELECT data FROM db_file_data WHERE zoneid = ?1 AND name = ?2 AND partidx = ?3",
467                    params![zone_id, name, start_part],
468                    |row| row.get(0),
469                )
470                .ok();
471
472            let mut part_data = existing.unwrap_or_default();
473            let space = PART_DATA_SIZE - part_data.len();
474            let to_copy = space.min(data.len());
475            part_data.extend_from_slice(&data[..to_copy]);
476            data_offset = to_copy;
477
478            conn.execute(
479                "REPLACE INTO db_file_data (zoneid, name, partidx, data) VALUES (?1, ?2, ?3, ?4)",
480                params![zone_id, name, current_part, part_data],
481            )?;
482            current_part += 1;
483        }
484
485        // Write remaining full parts
486        while data_offset < data.len() {
487            let end = (data_offset + PART_DATA_SIZE).min(data.len());
488            let part_data = &data[data_offset..end];
489            conn.execute(
490                "REPLACE INTO db_file_data (zoneid, name, partidx, data) VALUES (?1, ?2, ?3, ?4)",
491                params![zone_id, name, current_part, part_data],
492            )?;
493            data_offset = end;
494            current_part += 1;
495        }
496
497        // Update file size
498        conn.execute(
499            "UPDATE db_wave_file SET size = ?1, modts = ?2 WHERE zoneid = ?3 AND name = ?4",
500            params![new_size, now, zone_id, name],
501        )?;
502        drop(conn);
503
504        // Update cache
505        {
506            let new_size_bytes = (new_size as usize).max(64);
507            let mut cache = self.cache.lock().unwrap();
508            if let Some(entry) = cache.get_mut(&key) {
509                let old_size = entry.cached_size_bytes;
510                if let Some(ref mut f) = entry.file {
511                    f.size = new_size;
512                    f.modts = now;
513                }
514                entry.last_access_ms = now;
515                entry.cached_size_bytes = new_size_bytes;
516                let delta = new_size_bytes as i64 - old_size as i64;
517                let mut total = self.cache_total_bytes.lock().unwrap();
518                if delta >= 0 {
519                    *total += delta as usize;
520                } else {
521                    *total = total.saturating_sub((-delta) as usize);
522                }
523            }
524        }
525        self.evict_to_cap();
526
527        Ok(())
528    }
529
530    /// Write metadata. If `merge` is true, only specified keys are updated;
531    /// otherwise the entire metadata map is replaced.
532    pub fn write_meta(
533        &self,
534        zone_id: &str,
535        name: &str,
536        meta: FileMeta,
537        merge: bool,
538    ) -> Result<(), StoreError> {
539        let key = (zone_id.to_string(), name.to_string());
540        let now = Self::now_ms();
541
542        let file = self.stat(zone_id, name)?.ok_or(StoreError::NotFound)?;
543
544        let new_meta = if merge {
545            let mut merged = file.meta.clone();
546            for (k, v) in meta {
547                if v.is_null() {
548                    merged.remove(&k);
549                } else {
550                    merged.insert(k, v);
551                }
552            }
553            merged
554        } else {
555            meta
556        };
557
558        let meta_json = serde_json::to_string(&new_meta)?;
559        let conn = self.conn.lock().unwrap();
560        conn.execute(
561            "UPDATE db_wave_file SET meta = ?1, modts = ?2 WHERE zoneid = ?3 AND name = ?4",
562            params![meta_json, now, zone_id, name],
563        )?;
564        drop(conn);
565
566        // Update cache (metadata write doesn't change file.size, so cached_size_bytes unchanged)
567        let mut cache = self.cache.lock().unwrap();
568        if let Some(entry) = cache.get_mut(&key) {
569            if let Some(ref mut f) = entry.file {
570                f.meta = new_meta;
571                f.modts = now;
572            }
573            entry.last_access_ms = now;
574        }
575
576        Ok(())
577    }
578
579    /// List all files in a zone.
580    #[allow(dead_code)]
581    pub fn list_files(&self, zone_id: &str) -> Result<Vec<WaveFile>, StoreError> {
582        let conn = self.conn.lock().unwrap();
583        let mut stmt = conn.prepare(
584            "SELECT zoneid, name, size, createdts, modts, opts, meta FROM db_wave_file WHERE zoneid = ?1",
585        )?;
586        let rows = stmt.query_map(params![zone_id], |row| {
587            let opts_str: String = row.get(5)?;
588            let meta_str: String = row.get(6)?;
589            Ok(WaveFile {
590                zoneid: row.get(0)?,
591                name: row.get(1)?,
592                size: row.get(2)?,
593                createdts: row.get(3)?,
594                modts: row.get(4)?,
595                opts: serde_json::from_str(&opts_str).unwrap_or_default(),
596                meta: serde_json::from_str(&meta_str).unwrap_or_default(),
597            })
598        })?;
599
600        rows.collect::<Result<Vec<_>, _>>()
601            .map_err(StoreError::Sqlite)
602    }
603
604    /// Get all zone IDs that have files.
605    #[allow(dead_code)]
606    pub fn get_all_zone_ids(&self) -> Result<Vec<String>, StoreError> {
607        let conn = self.conn.lock().unwrap();
608        let mut stmt = conn.prepare("SELECT DISTINCT zoneid FROM db_wave_file")?;
609        let rows = stmt.query_map([], |row| row.get(0))?;
610        rows.collect::<Result<Vec<_>, _>>()
611            .map_err(StoreError::Sqlite)
612    }
613
614    /// Flush dirty cache entries to the database and evict stale clean entries.
615    /// Returns (files_flushed, parts_flushed).
616    #[allow(dead_code)]
617    pub fn flush_cache(&self) -> Result<(usize, usize), StoreError> {
618        let ttl_ms = (CACHE_TTL_SECS * 1000) as i64;
619        let now = Self::now_ms();
620        let cutoff_ms = now - ttl_ms;
621
622        let (dirty_keys, stale_keys): (Vec<_>, Vec<_>) = {
623            let cache = self.cache.lock().unwrap();
624            let dirty = cache
625                .iter()
626                .filter(|(_, e)| e.dirty)
627                .map(|(k, _)| k.clone())
628                .collect();
629            let stale = cache
630                .iter()
631                .filter(|(_, e)| !e.dirty && e.last_access_ms < cutoff_ms)
632                .map(|(k, _)| k.clone())
633                .collect();
634            (dirty, stale)
635        };
636
637        // Evict stale clean entries — they're already persisted in DB.
638        if !stale_keys.is_empty() {
639            let mut freed = 0usize;
640            let mut cache = self.cache.lock().unwrap();
641            for key in &stale_keys {
642                if let Some(removed) = cache.remove(key) {
643                    freed += removed.cached_size_bytes;
644                }
645            }
646            if freed > 0 {
647                let mut total = self.cache_total_bytes.lock().unwrap();
648                *total = total.saturating_sub(freed);
649            }
650            tracing::debug!("filestore cache: evicted {} stale entries ({} bytes)", stale_keys.len(), freed);
651        }
652
653        let mut files_flushed = 0;
654        let mut parts_flushed = 0;
655
656        for key in dirty_keys {
657            let entry = {
658                let mut cache = self.cache.lock().unwrap();
659                let entry = cache.remove(&key);
660                if let Some(ref e) = entry {
661                    let mut total = self.cache_total_bytes.lock().unwrap();
662                    *total = total.saturating_sub(e.cached_size_bytes);
663                }
664                entry
665            };
666
667            if let Some(entry) = entry {
668                if let Some(ref file) = entry.file {
669                    let conn = self.conn.lock().unwrap();
670                    let meta_json = serde_json::to_string(&file.meta)?;
671                    conn.execute(
672                        "UPDATE db_wave_file SET size = ?1, modts = ?2, meta = ?3 WHERE zoneid = ?4 AND name = ?5",
673                        params![file.size, file.modts, meta_json, file.zoneid, file.name],
674                    )?;
675
676                    for data_entry in entry.data_entries.values() {
677                        conn.execute(
678                            "REPLACE INTO db_file_data (zoneid, name, partidx, data) VALUES (?1, ?2, ?3, ?4)",
679                            params![file.zoneid, file.name, data_entry.part_idx, data_entry.data],
680                        )?;
681                        parts_flushed += 1;
682                    }
683                    files_flushed += 1;
684                }
685            }
686        }
687
688        Ok((files_flushed, parts_flushed))
689    }
690
691    /// Split data into PART_DATA_SIZE chunks.
692    fn split_into_parts(data: &[u8]) -> Vec<Vec<u8>> {
693        if data.is_empty() {
694            return Vec::new();
695        }
696        data.chunks(PART_DATA_SIZE)
697            .map(|chunk| chunk.to_vec())
698            .collect()
699    }
700
701    /// Start background flusher (call from async context).
702    #[allow(dead_code)]
703    pub fn start_flusher(self: &Arc<Self>) -> tokio::task::JoinHandle<()> {
704        let store = Arc::clone(self);
705        tokio::spawn(async move {
706            let mut interval =
707                tokio::time::interval(std::time::Duration::from_secs(DEFAULT_FLUSH_SECS));
708            loop {
709                interval.tick().await;
710                match store.flush_cache() {
711                    Ok((files, parts)) => {
712                        if files > 0 {
713                            tracing::debug!(
714                                "filestore flush: {} files, {} parts",
715                                files,
716                                parts
717                            );
718                        }
719                    }
720                    Err(e) => {
721                        tracing::error!("filestore flush error: {}", e);
722                    }
723                }
724            }
725        })
726    }
727}